package com.bigfans.framework.es;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder.Item;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.nested.Nested;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder.Field;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.FieldSortBuilder;

import com.bigfans.framework.es.request.AggregationResult;
import com.bigfans.framework.es.request.BulkInsertCriteria;
import com.bigfans.framework.es.request.CreateDocumentCriteria;
import com.bigfans.framework.es.request.CreateIndexCriteria;
import com.bigfans.framework.es.request.CreateMappingCriteria;
import com.bigfans.framework.es.request.MoreLikeThisSearchCriteria;
import com.bigfans.framework.es.request.SearchCriteria;
import com.bigfans.framework.es.request.SearchResult;
import com.bigfans.framework.utils.CollectionUtils;
import com.bigfans.framework.utils.ReflectionUtils;

/**
 * 
 * @Description:
 * @author lichong 2015年7月28日上午8:24:38
 *
 */
public class ElasticTemplate {

	private ElasticConnectionFactory connectionFactory;
	private Client client;
	
	public ElasticTemplate() {
	}
	
	public ElasticTemplate(ElasticConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
		this.client = connectionFactory.getConnection();
	}

	public ElasticConnectionFactory getConnectionFactory() {
		return connectionFactory;
	}

	public void setConnectionFactory(ElasticConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
		this.client = connectionFactory.getConnection();
	}

	public void destroy() {
		if (connectionFactory != null) {
			connectionFactory.close();
		}
	}
	
	public void reindex(){
		// 1. 创建临时索引
		// 2. 使用scroll api查询出旧索引中的数据
		// 	2.1 导入到临时索引
		// 3. 切换索引别名
	}

	/**
	 * 创建索引库
	 */
	public boolean createIndex(CreateIndexCriteria action) {
		CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices()
				.prepareCreate(action.getIndexName())
				.setSettings(action.getSettingsBuilder().getSettings());
		// update mapping on server
		boolean ok = createIndexRequestBuilder.execute().actionGet().isAcknowledged();
		// 创建别名
		createAlias(action.getAlias(), action.getIndexName());
		return ok;
	}
	
	public boolean createMapping(CreateMappingCriteria action) {
		Object mapping = action.getMappingBuilder().getMapping();
		PutMappingRequestBuilder requestBuilder = client.admin().indices().preparePutMapping(action.getIndexName())
				.setType(action.getType());
		if (mapping instanceof String) {
			requestBuilder.setSource(String.valueOf(mapping));
		} else if (mapping instanceof Map) {
			requestBuilder.setSource((Map) mapping);
		} else if (mapping instanceof XContentBuilder) {
			requestBuilder.setSource((XContentBuilder) mapping);
		}
		return requestBuilder.execute().actionGet().isAcknowledged();
	}

	public String getMapping(String indexname, String typename) {
		ImmutableOpenMap<String, MappingMetaData> mappings = client.admin().cluster().prepareState().execute()
				.actionGet().getState().getMetaData().getIndices().get(indexname).getMappings();
		return mappings.get(typename).source().toString();
	}

	public String getSettings(String index) {
		GetSettingsRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareGetSettings(index);
		ImmutableOpenMap<String, Settings> indexToSettings = createIndexRequestBuilder.get().getIndexToSettings();
		Settings settings = indexToSettings.get(index);
		return settings.toString();
	}

	/**
	 * 删除索引库
	 * 
	 * @param indexName
	 * @return
	 */
	public boolean deleteIndex(String indexName) {
		if (isIndexExists(indexName)) {
			return client.admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet().isAcknowledged();
		}
		return false;
	}

	public boolean isIndexExists(String indexName) {
		IndicesExistsResponse response = client.admin().indices()
				.exists(new IndicesExistsRequest().indices(new String[] { indexName })).actionGet();
		return response.isExists();
	}

	public boolean isTypeExists(String index, String type) {
		TypesExistsResponse response = client.admin().indices()
				.typesExists(new TypesExistsRequest(new String[] { index }, type)).actionGet();
		return response.isExists();
	}

	private void createAlias(String aliasName, String indexName) {
		// add new alias
		client.admin().indices().prepareAliases().addAlias(indexName, aliasName).get();
	}

	public boolean isAliasExists(String indexAliasName) {
		return client.admin().indices().prepareAliasesExist(indexAliasName).get().isExists();
	}

	public void bulkCreateMappings() {

	}

	public void insert(CreateDocumentCriteria action) {
		try {
			IndexDocument document = action.getDocument();
			IndexRequestBuilder indexRequestBuilder = client.prepareIndex(action.getIndex(), action.getType(),
					document.getDocId());
			indexRequestBuilder.setSource(document.getSource());
			indexRequestBuilder.get();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void bulkInsert(BulkInsertCriteria bulkAction) {
		try {
			BulkRequestBuilder bulkRequest = client.prepareBulk();
			List<IndexDocument> docList = bulkAction.getDocList();
			for (IndexDocument doc : docList) {
				IndexRequestBuilder indexRequest = this.prepareIndexRequest(bulkAction.getIndex(),
						bulkAction.getType(), doc);
				bulkRequest.add(indexRequest);
			}
			BulkResponse bulkResponse = bulkRequest.get();
			if (bulkResponse.hasFailures()) {
				throw new ElasticSearchException(bulkResponse.buildFailureMessage());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * @return
	 * @throws Exception
	 */
	private IndexRequestBuilder prepareIndexRequest(String index, String type, IndexDocument document) throws Exception {
		String id = document.getDocId();
		Object source = document.getSource();
		IndexRequestBuilder indexRequestBuilder = null;
		if (source != null) {
			indexRequestBuilder = client.prepareIndex(index, type, id);
			if (source instanceof String) {
				indexRequestBuilder.setSource(String.valueOf(source));
			} else if (source instanceof Map) {
				indexRequestBuilder.setSource((Map) source);
			} else if (source instanceof XContentBuilder) {
				indexRequestBuilder.setSource((XContentBuilder) source);
			}
		} else {
			throw new ElasticSearchException("object or source is null, failed to index the document [id: " + id + "]");
		}
		return indexRequestBuilder;
	}

	public <T> T getById(String index, String type, String docId, DocumentConverter<T> convertor) {
		GetResponse response = client.prepareGet(index, type, docId).get();
		if (response.isSourceEmpty()) {
			return null;
		}
		Map<String, Object> source = response.getSource();
		return convertor.toObject(source);
	}

	public <T> SearchResult<T> moreLikeThisDocument(MoreLikeThisSearchCriteria action, DocumentConverter<T> convertor) {
		Item item = new Item(action.getIndex(), action.getType(), action.getSourceDocId());
		Item[] items = { item };
		MoreLikeThisQueryBuilder query = QueryBuilders.moreLikeThisQuery(action.getFields(), null, items);
		query.minTermFreq(action.getMinTermFreq()).maxQueryTerms(action.getMaxQueryTerm())
				.minDocFreq(action.getMinDocFreq());
		SearchRequestBuilder request = client.prepareSearch(action.getIndex()).setTypes(action.getType())
				.setQuery(query).setFrom(action.getFrom()).setSize(action.getSize());
		SearchResponse searchResponse = request.get();
		SearchHits hits = searchResponse.getHits();
		List<T> data = new ArrayList<T>();
		for (SearchHit searchHit : hits) {
			T obj = convertor.toObject(searchHit.getSourceAsMap());
			data.add(obj);
		}
		SearchResult<T> searchResult = new SearchResult<T>();
		searchResult.setData(data);
		searchResult.setTotHitis(hits.getTotalHits());
		return searchResult;
	}

	public <T> SearchResult<T> moreLikeThisText(MoreLikeThisSearchCriteria action, DocumentConverter<T> convertor) {
		String[] text = { action.getLikeText() };
		MoreLikeThisQueryBuilder query = QueryBuilders.moreLikeThisQuery(action.getFields(), text, null);
		query.minTermFreq(action.getMinTermFreq()).maxQueryTerms(action.getMaxQueryTerm())
				.minDocFreq(action.getMinDocFreq());
		SearchRequestBuilder request = client.prepareSearch(action.getIndex()).setTypes(action.getType())
				.setQuery(query).setFrom(action.getFrom()).setSize(action.getSize());
		SearchResponse searchResponse = request.get();
		SearchHits hits = searchResponse.getHits();
		List<T> data = new ArrayList<T>();
		for (SearchHit searchHit : hits) {
			T obj = convertor.toObject(searchHit.getSourceAsMap());
			data.add(obj);
		}
		SearchResult<T> searchResult = new SearchResult<T>();
		searchResult.setData(data);
		searchResult.setTotHitis(hits.getTotalHits());
		return searchResult;
	}

	public <T> SearchResult<T> search(SearchCriteria searchCriteria, DocumentConverter<T> resultMapper) {
		// 解析搜索条件
		SearchRequestBuilder searchRequestBuilder = prepareSearch(searchCriteria);
		// 执行搜索,返回搜索响应信息
		SearchResponse response = searchRequestBuilder.execute().actionGet();
		// 格式化结果集
		List<T> results = mapResult(response, searchCriteria, resultMapper);
		SearchResult<T> searchResult = new SearchResult<T>();
		searchResult.setData(results);
		searchResult.setTotHitis(response.getHits().getTotalHits());
		// 聚集
		List<AggregationResult> aggreList = this.parseAggregation(searchCriteria, response);
		searchResult.setAggregationList(aggreList);

		return searchResult;
	}

	private SearchRequestBuilder prepareSearch(SearchCriteria searchCriteria) {
		SearchRequestBuilder searchRequestBuilder = client.prepareSearch(searchCriteria.getIndex());
		// 用来设定在多个类型中搜索
		searchRequestBuilder.setTypes(searchCriteria.getType());

		// 设置查询类型 1.SearchType.DFS_QUERY_THEN_FETCH = 精确查询 2.SearchType.SCAN =
		// 扫描查询,无序
		searchRequestBuilder.setSearchType(SearchType.DEFAULT);
		// 设置启用缓存
		searchRequestBuilder.setRequestCache(true);
		// 分页应用
		searchRequestBuilder.setFrom(searchCriteria.getFrom()).setSize(searchCriteria.getSize());

		// 创建查询条件
		BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
		QueryBuilder subQueryBuilder = searchCriteria.getQuery();
		finalQuery.must(subQueryBuilder);

		// 添加查询过滤器
		QueryBuilder filterQuery = searchCriteria.getQueryFilter();
		if (filterQuery != null) {
			finalQuery.filter(filterQuery);
		}
		searchRequestBuilder.setQuery(finalQuery);

		// 添加结果过滤器
		QueryBuilder filterBuilder = searchCriteria.getFilter();
		searchRequestBuilder.setPostFilter(filterBuilder);

		// 添加聚合条件
		List<AggregationBuilder> aggregations = searchCriteria.getAggregations();
		if (CollectionUtils.isNotEmpty(aggregations)) {
			for (AggregationBuilder aggregation : aggregations) {
				searchRequestBuilder.addAggregation(aggregation);
			}
		}

		// 设置是返回查询匹配度排序解释结果,耗性能
		// searchRequestBuilder.setExplain(true);

		// 设置排序
		List<FieldSortBuilder> sortList = searchCriteria.getSortList();
		if (CollectionUtils.isNotEmpty(sortList)) {
			for (FieldSortBuilder sb : sortList) {
				searchRequestBuilder.addSort(sb);
			}
			// 设置自定义排序时候同样进行打分计算
			searchRequestBuilder.setTrackScores(true);
		}

		// 设置最小分数
		if (searchCriteria.getMinScore() > 0) {
			searchRequestBuilder.setMinScore(searchCriteria.getMinScore());
		}

		// 设置高亮显示
		HighlightBuilder highlightBuilder = new HighlightBuilder();
		Field[] highlightedFields = searchCriteria.getHighlightFields();
		if (highlightedFields != null) {
			for (Field field : highlightedFields) {
				highlightBuilder.field(field);
			}
		}
		searchRequestBuilder.highlighter(highlightBuilder);
		return searchRequestBuilder;
	}

	/**
	 * 查询结果聚合处理
	 * 
	 * @param response
	 */
	private List<AggregationResult> parseAggregation(SearchCriteria searchCriteria, SearchResponse response) {
		// 处理aggregation
		if (response.getAggregations() == null) {
			return Collections.emptyList();
		}
		Map<String, Aggregation> aggregationMap = response.getAggregations().asMap();
		List<AggregationResult> aggreResultList = new ArrayList<AggregationResult>();
		for (Map.Entry<String, Aggregation> aggrEntry : aggregationMap.entrySet()) {
			Aggregation aggregation = aggrEntry.getValue();
			this.parseAggregation(aggregation, aggreResultList);
		}
		return aggreResultList;
	}

	private void parseAggregation(Aggregation aggregation, List<AggregationResult> aggreResultList) {
		InternalTerms<?, ?> terms = null;
		if (aggregation instanceof StringTerms) {
			terms = (StringTerms) aggregation;
		} else if (aggregation instanceof DoubleTerms) {
			terms = (DoubleTerms) aggregation;
		} else if (aggregation instanceof LongTerms) {
			terms = (LongTerms) aggregation;
		} else if (aggregation instanceof Nested) {
			Nested nested = (Nested) aggregation;
			Aggregations nestedAggr = nested.getAggregations();
			List<Aggregation> nestedAggrList = nestedAggr.asList();
			for (Aggregation aggrInBucket : nestedAggrList) {
				this.parseAggregation(aggrInBucket, aggreResultList);
			}
		} else if (aggregation instanceof Max) {
			Max max = (Max) aggregation;
			max.getValue();
			AggregationResult aggreResult = new AggregationResult();
			aggreResult.setName(aggregation.getName());
			aggreResult.setKey(max.getValueAsString());
			aggreResultList.add(aggreResult);
		} else if (aggregation instanceof UnmappedTerms) {
			// ignore
		}

		if (terms != null) {
			List<Bucket> buckets = (List<Bucket>) terms.getBuckets();
			for (Bucket b : buckets) {
				AggregationResult result = new AggregationResult();
				String keyStr = b.getKeyAsString();
				long docCount = b.getDocCount();
				String name = terms.getName();
				result.setName(name);
				result.setDocCount(docCount);
				result.setKey(keyStr);
				aggreResultList.add(result);
			}
		}
	}

	/**
	 * 匹配查询记录
	 * 
	 * @param response
	 * @param resultMapper
	 * @return
	 */
	private <T> List<T> mapResult(SearchResponse response, SearchCriteria searchCriteria,
			DocumentConverter<T> resultMapper) {
		// 获取搜索的文档结果
		// 将文档中的每一个对象转换json串值
		List<T> resultList = new ArrayList<T>();
		Field[] highlightFields = searchCriteria.getHighlightFields();
		SearchHits searchHits = response.getHits();
		SearchHit[] hits = searchHits.getHits();
		for (int i = 0; i < hits.length; i++) {
			SearchHit hit = hits[i];
			T obj = resultMapper.toObject(hit.getSourceAsMap());
			// 获取对应的高亮域
			Map<String, HighlightField> result = hit.getHighlightFields();
			if (highlightFields != null && highlightFields.length != 0) {
				for (Field field : highlightFields) {
					// 从设定的高亮域中取得指定域
					if (!result.containsKey(field)) {
						continue;
					}
					HighlightField titleField = result.get(field);
					// 取得定义的高亮标签
					Text[] titleTexts = titleField.fragments();
					StringBuilder value = new StringBuilder(50);
					// 为title串值增加自定义的高亮标签
					for (Text text : titleTexts) {
						value.append(text);
					}
					try {
						ReflectionUtils.setProperty(obj, field.name(), value.toString());
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
			resultList.add(obj);
		}
		return resultList;
	}
}
